<?php
namespace common\corelib\plugins\rabbitmq;

use common\models\mongodb\EventPublish;
use common\services\mq\PublishService;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Class rabbitmq
 *
 * @author  无花
 * 2017-7-18
 *
 * @todo  动态删除exchannel  动态删除queue  增加用户  增加权限  生产消费解耦
 *
 */


class rabbitmq
{
    const FANOUT = 'fanout';

    const TOPIC = 'topic';

    const DIRECT = 'direct';

    public $host = '192.168.0.100';

    public $port = 5672;

    public $vhost = '/';

    public $user = 'admin';

    public $password = 'admin123';

    private $connection;

    private $channel;

    public $queue_name;

    public $msg_data = 'hello queue';

    public $rout_error_msg = '路由规则不能为空';

    public function __construct($config = array())
    {
        if (empty($config)) {
            $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost);
        } else {
            $this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
        }

        $this->channel = $this->connection->channel();

    }

    /**
     * 生成一个交换机exchannel
     *
     * 没有返回值,rabbitmq内部生成
     *
     * @param string $name 交换机名称
     * @param string $typen 交换机类型   路由类型rounting key,fanout(广播),direct(路由),topic(主题)   默认fanout
     * @param bool   $passive 检查是否存在
     * @param bool   $durable 是否持久————决定机器重启时候是否自动恢复
     * @param bool   $auto_delete 是否自动删除(rabbitmq底层默认true)
     *
     * @return bool
     */
    public function declareExchange($name, $type = self::FANOUT, $passive = false, $durable = false, $auto_delete = false)
    {
        $this->channel->exchange_declare($name, $type, $passive, $durable, $auto_delete);

        return true;
    }

    /**
     * 生成一个队列queue
     *
     * @param string $queue_name 队列名称.默认为空,系统自动生成.
     * @param string $passive 检查队列名称是否存在  有就返回给你,没有报错..如此鸡肋  没有用queue_declare的方法会自动创建
     * @param bool   $auto_dele 是否自动删除 默认自动删除
     * @param bool   $durable 是否是持久
     * @param bool   $exclusive 是否排外,排外就是代表只能在当前channel下通信.排外的通道当连接断开队列会消失,不管是不是持久(很重要).
     * @param bool   $argument 扩展参数
     *
     * @return string
     */
    public function declareQueue($queue_name = '', $passive = false, $durable = false, $exclusive = false, $auto_dele = true)
    {
        if (empty($queue_name)) {
            list($queue_name, ,) = $this->channel->queue_declare($queue_name, $passive, $durable, $exclusive, $auto_dele);
        } else {
            $this->channel->queue_declare($queue_name, $passive, $durable, $exclusive, $auto_dele);
        }

        return $queue_name;
    }

    /**
     * 队列绑定
     *
     * @param stirng $queue_name 队列名称
     * @param string $exchannel_name 交换机名称
     * @param string $routing_key 路由规则
     *
     * @return bool
     */
    public function queueBind($queue_name, $exchannel_name, $routing_key = '')
    {
        $this->channel->queue_bind($queue_name, $exchannel_name, $routing_key);

        return true;
    }


    /**
     * 生成消息
     *
     * @param string $data string 消息内容
     * @param array  $properties array
     *
     * @return AMQPMessage
     */
    public function amqpMessage($data, $properties = [])
    {
        $msg = new AMQPMessage($data, $properties);

        return $msg;
    }

    /**
     * 推送消息——生产者提交
     *
     * @param string $msg 传递内容
     * @param string $exchannel 交换机
     * @param string $routing_key 路由规则
     * @param mixed  $mandatory
     * @param bool   $immediate 延时设定,可以延时队列
     * @param mixed  $ticket
     *
     * @return bool
     *
     */
    public function basicPublish($msg, $exchannel, $routing_key, $mandatory = false, $immediate = false, $ticket = null)
    {
        $this->channel->basic_publish($msg, $exchannel, $routing_key, $mandatory, $immediate, $ticket);

        return true;
    }

    /**
     * 设定队列一次处理的消息条数————主要用于ack确定模式,保证一次只处理一条消息
     *
     * @param string $prefetch_size
     * @param int    $prefetch_count 一次处理message数量
     * @param string $a_global
     *
     * @return bool
     */
    public function basicQos()
    {
        $this->channel->basic_qos(null, 1, null);

        return true;
    }

    /**
     * 推送消息——消费者提交
     *
     * @param string $queue_name 队列名称
     * @param string $consumer_tag 消费者的标记,内部生成一个唯一标识
     * @param bool   $no_local = false,
     * @param bool   $no_ack = false, 是不是需要ack来确定 false关闭ack自动应答,true启动主动应答
     * @param bool   $exclusive = false,  是否要排外
     * @param bool   $nowait = false,    需不需要等待,就是提交之后不管连接是不是断开,我先断了.
     * @param mixed  $callback = null,   回调函数
     *
     * @return bool
     */
    public function basicConsume($queue_name, $consumer_tag = '', $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback)
    {
        $this->channel->basic_consume($queue_name, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, $callback);

        return true;
    }

    public function wait()
    {
        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }

    public function channelClose()
    {
        $this->channel->close();
    }

    public function connectionClose()
    {
        $this->connection->close();
    }

    public function allClose()
    {
        $this->channel->close();
        $this->connection->close();
    }

    /****************************一般模式*******************************************/

    /**
     * 一般模式——生产者
     * 1 建立一个不自动删除队列
     * 2 队列放入消息
     *
     * @param string $queue_name 队列名称
     * @param string $msg_data 消息内容
     * @param boolean $close_connection 关闭进程
     */
    public function commonProduct($queue_name, $msg_data, $close_connection=true)
    {

        $this->declareQueue($queue_name, false, false, false, false);
        $msg = $this->amqpMessage($msg_data);
        $this->channel->basic_publish($msg, '', $queue_name);

        if ($close_connection) $this->allClose();
    }

    /**
     * 一般模式——消费者
     * 1 连接队列
     * 2 处理消息
     *
     * @param string $queue_name 队列名称
     * @param string $callback_consumer 回调函数
     *
     */
    public function commonConsume($queue_name, $callback_consumer)
    {
        $callback = function ($msg) use ($callback_consumer) {
            call_user_func($callback_consumer, $msg);
        };

        $this->declareQueue($queue_name, false, false, false, false);
        $this->basicConsume($queue_name, '', false, true, false, false, $callback);

        $this->wait();
        $this->allClose();
    }

    /****************************工作模式模式*******************************************/
    /**
     * 工作模式——生产者
     * 1 建立一个持久队列
     * 2 队列放入消息——队列开启工作模式
     * 3 推送到交换机
     *
     * @param string $queue_name 队列名称
     * @param string $msg_data 消息内容
     * @param boolean $close_connection 关闭进程
     *
     */
    public function workProduct($queue_name, $msg_data, $close_connection=true)
    {

        $this->declareQueue($queue_name, false, true, false, false);

        if (empty($msg_data)) {
            $msg_data = $this->msg_data;
        } elseif (!empty($msg_data)) {
            /* /START/====================== add by 郭靖 2017/08/04 - 添加自动生成发布ID ====================== */

            // 生成当前消息的发布ID
            $temp_msg_data = json_decode($msg_data, true);
            if (!isset($temp_msg_data['mq_pub_id']) && isset($temp_msg_data['topic']) && !empty($temp_msg_data['topic'])) {
                $temp_msg_data['mq_pub_id'] = md5($temp_msg_data['topic']).time();
                $msg_data = json_encode($temp_msg_data);
            }

            /* /END/====================== add by 郭靖 2017/08/04 - 添加自动生成发布ID ====================== */
        }

        /* /START/====================== add by 郭靖 2017/08/04 - 添加队列应答模式 ====================== */

        // 回调mongo推送落地记录
        $MONGO_PUBLISH_LOG = new EventPublish();
        $MONGO_PUBLISH_LOG->savePublishRecord($msg_data, 1, 'work'); // 调用mongo记录回调推送结果

        // mq成功应答时回调函数 - 推送消息成功
//        $this->channel->set_ack_handler(
//            function (AMQPMessage $message) {
//                // 回调mongo推送落地记录
//                $MONGO_PUBLISH_LOG = new EventPublish();
//                $MONGO_PUBLISH_LOG->savePublishRecord($message->body, 1, 'work'); // 调用mongo记录回调推送结果
//            }
//        );

        // mq没有应答时的回调函数 - 推送消息丢失
        $this->channel->set_nack_handler(
            function (AMQPMessage $message) {
                // 回调mongo推送落地记录
                $MONGO_PUBLISH_LOG = new EventPublish();
                $MONGO_PUBLISH_LOG->savePublishRecord($message->body, 2, 'work'); // 调用mongo记录回调推送结果
            }
        );

        // mq无法进入队列时的回调 - 消息入队失败
        $this->channel->set_return_listener(
            function ($replyCode, $replyText, $exchange, $routingKey, AMQPMessage $message) {
                sleep(1);
                // 回调mongo推送落地记录
                $MONGO_PUBLISH_LOG = new EventPublish();
                $MONGO_PUBLISH_LOG->savePublishRecord($message->body, 0, 'work', $replyCode, $replyText, $exchange, $routingKey); // 调用mongo记录回调推送结果
            }
        );

        $this->channel->confirm_select(); // 开启应答模式

        /* /END/====================== add by 郭靖 2017/08/04 - 添加队列应答模式 ====================== */

        $msg = $this->amqpMessage($msg_data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

        $this->channel->basic_publish($msg, '', $queue_name, true);

        if ($close_connection) $this->channel->wait_for_pending_acks_returns(); // 等待mq应答消息推送

        if ($close_connection) $this->allClose();
    }

    /**
     * ================================================================
     * 工作模式——生产者 (带循环推送)
     * ================================================================
     * --------------------------------------------------
     * 1 - 如果是单条消息推送, msg_data只需要传单个消息的字符
     * $msg_data example:
     * {\"params\":{\"page\":1,\"limit\":100,\"total\":3,\"vendor_user_id\":271},\"topic\":\"stock.stock_purchase.es_push_vendor\"}
     * ---------------------------------------------------
     * 2 - 如果是多条批量推送, 需要把所有消息都放在msg_data数组里面传递过来
     * [
     *    {\"params\":{\"page\":1,\"limit\":100,\"total\":3,\"vendor_user_id\":271},\"topic\":\"stock.stock_purchase.es_push_vendor\"}
     *    {\"params\":{\"page\":2,\"limit\":100,\"total\":3,\"vendor_user_id\":271},\"topic\":\"stock.stock_purchase.es_push_vendor\"}
     *    {\"params\":{\"page\":3,\"limit\":100,\"total\":3,\"vendor_user_id\":271},\"topic\":\"stock.stock_purchase.es_push_vendor\"}
     * ]
     * ---------------------------------------------------
     *
     * @author 郭靖
     * @date 2017/08/06
     *
     * @param string $queue_name 队列名称
     * @param string $msg_data 消息内容
     * @param boolean $close_connection 关闭进程
     * @param boolean $retry 是否重新推
     *
     */
    public function workProductMix($queue_name, $msg_data, $close_connection=true, $retry=false)
    {

        $this->declareQueue($queue_name, false, true, false, false);

        if (empty($msg_data)) {
            $msg_data = $this->msg_data;
        } elseif (!empty($msg_data) && !$retry) {
            /* /START/====================== add by 郭靖 2017/08/04 - 添加自动生成发布ID ====================== */

            $MONGO_PUBLISH_LOG = new EventPublish();

            // 如果$msg_data是json字符就使用单推
            if (is_string($msg_data)) {
                // 生成当前消息的发布ID
                $temp_msg_data = json_decode($msg_data, true);
                if (!isset($temp_msg_data['mq_pub_id']) && isset($temp_msg_data['topic']) && !empty($temp_msg_data['topic'])) {
                    $temp_msg_data['mq_pub_id'] = md5($temp_msg_data['topic']).time();
                    $msg_data = json_encode($temp_msg_data);
                }

                // 回调mongo推送落地记录
                $MONGO_PUBLISH_LOG->savePublishRecord($msg_data, 1, 'work'); // 调用mongo记录回调推送结果
            } else {
                // 如果是数组就使用批量推送
                foreach ($msg_data as $key=>&$msgs) {

                    // 生成当前消息的发布ID
                    $temp_msg_data = json_decode($msgs, true);
                    if (!isset($temp_msg_data['mq_pub_id']) && isset($temp_msg_data['topic']) && !empty($temp_msg_data['topic'])) {
                        $temp_msg_data['mq_pub_id'] = md5($temp_msg_data['topic']).time().($key+1);
                        $msgs = json_encode($temp_msg_data);
                    }

                    // 回调mongo推送落地记录
                    $MONGO_PUBLISH_LOG->savePublishRecord($msgs, 1, 'work'); // 调用mongo记录回调推送结果
                }
            }

            /* /END/====================== add by 郭靖 2017/08/04 - 添加自动生成发布ID ====================== */
        }

        /* /START/====================== add by 郭靖 2017/08/04 - 添加队列应答模式 ====================== */

        // mq没有应答时的回调函数 - 推送消息丢失
        $this->channel->set_nack_handler(
            function (AMQPMessage $message) {
                // 回调mongo推送落地记录
                $MONGO_PUBLISH_LOG = new EventPublish();
                $MONGO_PUBLISH_LOG->savePublishRecord($message->body, 2, 'work'); // 调用mongo记录回调推送结果
            }
        );

        // mq无法进入队列时的回调 - 消息入队失败
        $this->channel->set_return_listener(
            function ($replyCode, $replyText, $exchange, $routingKey, AMQPMessage $message) {
                // 回调mongo推送落地记录
                $MONGO_PUBLISH_LOG = new EventPublish();
                $MONGO_PUBLISH_LOG->savePublishRecord($message->body, 0, 'work', $replyCode, $replyText, $exchange, $routingKey); // 调用mongo记录回调推送结果
            }
        );

        $this->channel->confirm_select(); // 开启应答模式

        /* /END/====================== add by 郭靖 2017/08/04 - 添加队列应答模式 ====================== */

        // 如果$msg_data是json字符就使用单推
        if (is_string($msg_data)) {
            $msg = $this->amqpMessage($msg_data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
            $this->channel->basic_publish($msg, '', $queue_name, true);
        } else {
            // 如果是数组就使用批量推送
            foreach ($msg_data as $msgs_string) {
                $msg = $this->amqpMessage($msgs_string, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
                $this->channel->basic_publish($msg, '', $queue_name, true);
            }
        }

        if ($close_connection) $this->channel->wait_for_pending_acks_returns(); // 等待mq应答消息推送

        if ($close_connection) $this->allClose();
    }

    /**
     * 工作模式——消费者
     * 1 连接队列
     * 2 处理消息
     *
     * @param string $queue_name 队列名称
     * @param string $callback_consumer 回调函数
     *
     */
    public function workConsume($queue_name, $callback_consumer)
    {

        $callback = function ($msg) use ($callback_consumer) {
            call_user_func($callback_consumer, $msg);
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };

        $this->declareQueue($queue_name, false, true, false, false);

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume($queue_name, '', false, false, false, false, $callback);

        $this->wait();
        $this->allClose();
    }

    /****************************发布-订阅模式*******************************************/
    /**
     * 发布订阅——生产者
     * 1 建立一个交换机
     * 2 生成消息
     * 3 推送到交换机
     *
     * @param string $exchange_name 交换机名称
     * @param string $msg_data 消息内容
     */
    public function pubsubProduct($exchange_name, $msg_data)
    {

        $this->declareExchange($exchange_name, self::FANOUT);

        if (empty($msg_data)) {
            $msg_data = $this->msg_data;
        }
        $msg = $this->amqpMessage($msg_data);

        $this->channel->basic_publish($msg, $exchange_name);

        $this->allClose();
    }


    /**
     * 发布订阅——消费者
     * 1 声明交换机
     * 2 创建临时队列——exclusive为true的对列,叫排外队列,当exchannel断开,队列同时delete
     * 3 队列绑定交换机
     * 4 消费者等待处理
     *
     * @param string $exchange_name 交换机名称
     * @param string $callback_consumer 回调函数
     */
    public function pubsubConsume($exchange_name, $callback_consumer)
    {
        $callback = function ($msg) use ($callback_consumer) {
            call_user_func($callback_consumer, $msg);
        };

        $this->declareExchange($exchange_name, self::FANOUT);
        $queue_name = $this->declareQueue('', false, false, true, false);
        $this->queueBind($queue_name, $exchange_name);

        $this->channel->basic_consume($queue_name, '', false, true, false, false, $callback);

        $this->wait();
        $this->allClose();

    }


    /****************************路由模式*******************************************/
    /**
     * 路由模式-生产者
     * 1 声明交换机
     * 2 生成消息
     * 3 提交到交换机,声明交换机规则
     *
     * @param string $exchange_name
     * @param string $msg_data
     * @param string $routing_key
     */
    public function routProduct($exchange_name, $msg_data, $routing_key)
    {

        $this->declareExchange($exchange_name, self::DIRECT);
        if (empty($msg_data)) {
            $msg_data = $this->msg_data;
        }
        $msg = $this->amqpMessage($msg_data);
        $this->channel->basic_publish($msg, $exchange_name, $routing_key);

        $this->allClose();
    }

    /**
     * 发布订阅——消费者
     * 1 声明交换机
     * 2 创建临时队列
     * 3 队列绑定交换机————一个队列一次只能绑定一个规则.如果一个队列有多个规则,需要循环绑定
     * 4 消费者等待处理
     *
     * @param string $exchange_name 交换机名称
     * @param array  $binding_keys 路由规则
     * @param string $callback_consumer 回调函数
     *
     */
    public function routConsume($exchange_name, $binding_keys = [], $callback_consumer)
    {

        //回调处理
        $callback = function ($msg) use ($callback_consumer) {
            call_user_func($callback_consumer, $msg);
        };

        $this->declareExchange($exchange_name, self::DIRECT);
        $queue_name = $this->declareQueue('', false, false, true, false);

        if (empty($binding_keys)) {
            exit($this->rout_error_msg);
        }

        foreach ($binding_keys as $binding_key) {
            $this->queuebind($queue_name, $exchange_name, $binding_key);
        }

        $this->channel->basic_consume($queue_name, '', false, true, false, false, $callback);

        $this->wait();
        $this->allClose();

    }

    /****************************主题模式*******************************************/
    /**
     * 主题模式-生产者
     * 1 声明交换机
     * 2 生成消息
     * 3 提交到交换机,声明交换机规则
     *
     * @param string $exchange_name
     * @param string $msg_data
     * @param string $routing_key
     *
     *
     */
    public function topicProduct($exchange_name, $msg_data, $routing_key)
    {

        $this->declareExchange($exchange_name, self::TOPIC);

        if (empty($msg_data)) {
            $msg_data = $this->msg_data;
        }
        $msg = $this->amqpMessage($msg_data);

        $this->channel->basic_publish($msg, $exchange_name, $routing_key);

        $this->allClose();

    }

    /**
     * 主题——消费者
     * 1 声明交换机
     * 2 创建临时队列
     * 3 队列绑定交换机————一个队列一次只能绑定一个规则.如果一个队列有多个规则,需要循环绑定
     * 4 消费者等待处理
     *
     * 返回msg属性
     * {
     *      body  内容
     *      body_size  内容大小
     *      is_truncated
     *      content_encoding
     *      delivery_info{
     *          channel  objet
     *          consumer_tag  消费者标记
     *          delivery_tag
     *          redelivered
     *          exchange  交换机名称
     *          routing_key  队列绑定的路由规则
     *      }
     * }
     *
     * 匹配规则
     * 1 # 所有的都匹配
     * 2 *.info     匹配.info结尾的路由
     *   info.*     匹配info.开头的路由
     * *.info.*     匹配中间.info.的路由
     * 注意:当用*号的匹配,点号.也是匹配的一部分,但只能匹配一个点号
     *      当用#号的匹配,可以匹配多个点号
     *
     * @param string $exchange_name 交换机名称
     * @param array  $binding_keys 路由规则
     * @param string $callback_consumer 回调函数
     *
     */
    public function topicConsume($exchange_name, $binding_keys = [], $callback_consumer)
    {
        $callback = function ($msg) use ($callback_consumer) {
            call_user_func($callback_consumer, $msg);
        };

        $this->declareExchange($exchange_name, self::TOPIC);
        $queue_name = $this->declareQueue('', false, false, true, false);


        if (empty($binding_keys)) {
            exit($this->rout_error_msg);
        }

        foreach ($binding_keys as $binding_key) {
            $this->queueBind($queue_name, $exchange_name, $binding_key);
        }

        $this->channel->basic_consume($queue_name, '', false, true, false, false, $callback);

        $this->wait();
        $this->allClose();

    }

    /**
     * 自定义模式————生产者
     * 采取主题模式
     *
     * @author 无花
     * @date   2017/09/21
     * @modified guojing 2017/09/22 - 补上方法注释
     *
     * @param string $exchange_name 交换机名
     * @param mixed  $msg_data      消息内容
     * @param string $routing_key   路由key
     * @param string $rout          模式 - 默认主题模式
     *
     */
    public  function cusProduct($exchange_name, $msg_data , $routing_key , $rout = self::TOPIC)
    {
        // ============== 2017/09/21 added by guojing - 添加发布事件记录 /start/=============== //

        if (empty($msg_data)) {
            $msg_data = $this->msg_data;
        } else {
            // mq发布事件落地
            $S_PUBLISH = new PublishService();

            // 如果$msg_data是json字符就使用单推
            if (is_string($msg_data)) {
                // 生成当前消息的发布ID
                $temp_msg_data = json_decode($msg_data, true);
                if (!isset($temp_msg_data['publish_id']) && isset($temp_msg_data['theme_id']) && !empty($temp_msg_data['theme_id'])) {
                    // 回调推送落地记录
                    $publish_res = $S_PUBLISH->savePublish($msg_data, 1); // 调用mongo记录回调推送结果
                    if ($publish_res['status'] == 1) $msg_data = $publish_res['message'];
                }
            } else {
                // 如果是数组就使用批量推送
                foreach ($msg_data as $key=>&$msgs) {

                    // 生成当前消息的发布ID
                    $temp_msg_data = json_decode($msgs, true);
                    if (!isset($temp_msg_data['publish_id']) && isset($temp_msg_data['theme_id']) && !empty($temp_msg_data['theme_id'])) {
                        // 回调推送落地记录
                        $publish_res = $S_PUBLISH->savePublish($msgs, 1); // 调用mongo记录回调推送结果
                        if ($publish_res['status'] == 1) $msgs = $publish_res['message'];
                    }
                }
            }
        }

        // 定义交换机
        $this->declareExchange($exchange_name, $rout ,false ,true);

        // mq没有应答时的回调函数 - 推送消息丢失
        $this->channel->set_nack_handler(
            function (AMQPMessage $message) {
                // 回调mongo推送落地记录
                $S_PUBLISH = new PublishService();
                $S_PUBLISH->savePublish($message->body, 2); // 调用mongo记录回调推送结果
            }
        );

        // mq无法进入队列时的回调 - 消息入队失败
        $this->channel->set_return_listener(
            function ($replyCode, $replyText, $exchange, $routingKey, AMQPMessage $message) {
                // 回调mongo推送落地记录
                $S_PUBLISH = new PublishService();
                $S_PUBLISH->savePublish($message->body, 0, $replyCode, $replyText, $exchange, $routingKey); // 调用mongo记录回调推送结果
            }
        );

        $this->channel->confirm_select(); // 开启应答模式

        // 如果$msg_data是json字符就使用单推
        if (is_string($msg_data)) {
            $msg = $this->amqpMessage($msg_data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
            $this->channel->basic_publish($msg, $exchange_name, $routing_key, true);
        } else {
            // 如果是数组就使用批量推送
            foreach ($msg_data as $msgs_string) {
                $msg = $this->amqpMessage($msgs_string, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
                $this->channel->basic_publish($msg, $exchange_name, $routing_key, true);
            }
        }

        //落地操作————失败重复,成功改变状态1
        $this->channel->wait_for_pending_acks_returns(); // 等待mq应答消息推送

        // ============== 2017/09/21 added by guojing - 添加发布事件记录 /end/=============== //

        $this->allClose();
    }


    /**
     * 自定义模式————消费者——一般处理（是否要绑定队列）
     *
     * @author 无花
     * @modified guojing 2017/09/22 - 补上方法注释, 添加消息落地流程
     *
     * @param string $exchange_name     交换机名
     * @param array  $binding_keys      绑定路由
     * @param string $queue_name        队列名
     * @param string $callback_consumer 回调函数
     * @param string $rout              模式 - 默认主题
     */
    public function cusComConsume($exchange_name, $binding_keys = array(), $queue_name = '', $callback_consumer, $rout = self::TOPIC)
    {
        $callback = function($msg) use ($callback_consumer){
            call_user_func($callback_consumer, $msg );
        };

        $this->declareExchange($exchange_name, $rout ,false ,true);
        $queue_name = $this->declareQueue($queue_name, false, false, true, false);


        if( empty($binding_keys )) {
            exit($this->rout_error_msg);
        }

        foreach($binding_keys as $binding_key) {
            $this->queueBind($queue_name, $exchange_name, $binding_key);
        }

        $this->channel->basic_consume($queue_name, '', false, true, false, false, $callback);

        $this->wait();
        $this->allClose();

    }

    /**
     * 自定义模式————消费者——工作模式
     *
     * @author 无花
     * @modified guojing 2017/09/22 - 补上方法注释, 添加消息落地流程
     *
     * @param string $exchange_name     交换机名
     * @param array  $binding_keys      绑定路由
     * @param string $queue_name        队列名
     * @param string $callback_consumer 回调函数
     * @param string $rout              模式 - 默认主题
     */
    public function cusWorkConsume($exchange_name, $binding_keys = array(), $queue_name, $callback_consumer, $rout = self::TOPIC)
    {
        $callback = function($msg) use ($callback_consumer){
            call_user_func($callback_consumer, $msg );
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };
        //声明交换机
        $this->declareExchange($exchange_name, $rout ,false,true);

        //声明队列
        $this->declareQueue($queue_name, false, true, false, false);
        //$queue_name = $this->declareQueue('', false, false, true, false);


        if( empty($binding_keys )) {
            exit($this->rout_error_msg);
        }
        //绑定路由key
        foreach($binding_keys as $binding_key) {
            $this->queueBind($queue_name, $exchange_name, $binding_key);
        }

        //#若存在多个consumer每个consumer的负载可能不同，有些处理的快有些处理的慢
        //#RabbitMQ并不管这些，只是简单的以round-robin的方式分配message
        //#这可能造成某些consumer积压很多任务处理不完而一些consumer长期处于饥饿状态
        //#可以使用prefetch_count=1的basic_qos方法可告知RabbitMQ只有在consumer处理并确认了上一个message后才分配新的message给他
        //#否则分给另一个空闲的consumer
        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume($queue_name, '', false, false, false, false, $callback);
        //$this->channel->basic_consume($queue_name, '', false, true, false, false, $callback);

        $this->wait();
        $this->allClose();

    }

}

?>
